package cn.ms.neural.moduler.flowrate.core;

import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

import cn.ms.neural.common.exception.flowrate.FlowrateException;
import cn.ms.neural.common.exception.flowrate.support.QPSRejectREQException;
import cn.ms.neural.common.utils.StringUtils;
import cn.ms.neural.entity.NeuralConf;
import cn.ms.neural.moduler.flowrate.IFlowrate;
import cn.ms.neural.moduler.flowrate.core.ratelimiter.NIORateLimiterFlowrate;
import cn.ms.neural.moduler.flowrate.core.semaphore.NIOSemaphoreFlowrate;
import cn.ms.neural.moduler.flowrate.entity.FlowrateRule;
import cn.ms.neural.moduler.flowrate.handler.IFlowrateHandler;

/**
 * 限流 <br>
 * 并发数<br>
 * QPS:每秒钟、每分钟、每小时、每一天的<br>
 * <br>
 * 
 * @author lry
 *
 * @param <REQ>
 * @param <RES>
 */
public class FlowrateFactory<REQ, RES> implements IFlowrate<REQ, RES> {

	/**
	 * 流控规则
	 */
	private final ConcurrentHashMap<String, FlowrateRule> flowrateRuleMap = new ConcurrentHashMap<String, FlowrateRule>();
	/**
	 * QPS流控
	 */
	private final ConcurrentHashMap<String, NIORateLimiterFlowrate<REQ, RES>> nioRateLimiterFlowrateMap = new ConcurrentHashMap<String, NIORateLimiterFlowrate<REQ, RES>>();
	/**
	 * 并发流控
	 */
	private final ConcurrentHashMap<String, NIOSemaphoreFlowrate<REQ, RES>> nioSemaphoreFlowrateMap = new ConcurrentHashMap<String, NIOSemaphoreFlowrate<REQ, RES>>();

	public FlowrateFactory(List<FlowrateRule> flowrateRules) {
		if (flowrateRules == null) {
			return;
		}

		for (FlowrateRule flowrateRule : flowrateRules) {
			flowrateRuleMap.put(flowrateRule.getKey(), flowrateRule);
		}
	}

	/**
	 * 初始化
	 */
	public void init() throws Throwable {

	}

	/**
	 * 流控
	 */
	public RES flowrate(NeuralConf flowrateConf, REQ flowrateREQ, final IFlowrateHandler<REQ, RES> flowrateHandler)
			throws Throwable {
		if (!flowrateConf.isFlowrateEnable()) {// 流控总开关
			return flowrateHandler.handler(flowrateConf, flowrateREQ);
		}

		// 校验参数
		if (flowrateConf == null || flowrateREQ == null || StringUtils.isBlank(flowrateConf.getNeuralId())) {// 忽略非法请求的流控
			return flowrateHandler.handler(flowrateConf, flowrateREQ);
		}

		final FlowrateRule flowrateRule = flowrateRuleMap.get(flowrateConf.getNeuralId());
		if (flowrateRule != null) {// 为空则不需要流控
			switch (flowrateRule.getFlowrateType()) {
			case CCT:// 并发流控
				return this.cct(flowrateConf, flowrateREQ, flowrateRule, flowrateHandler);
			case QPS:// 速率流控
				return this.qps(flowrateConf, flowrateREQ, flowrateRule, flowrateHandler);
			case CCT_QPS:// 并发、速率流控
				return this.cct(flowrateConf, flowrateREQ, flowrateRule, new IFlowrateHandler<REQ, RES>() {
					@Override
					public RES handler(NeuralConf flowrateConf, REQ flowrateREQ) throws Throwable {
						return qps(flowrateConf, flowrateREQ, flowrateRule, flowrateHandler);
					}
				});
			default:
				break;
			}
		} else {// 未制定规则
			return flowrateHandler.handler(flowrateConf, flowrateREQ);
		}
		throw new FlowrateException("流控异常");
	}

	/**
	 * 并发控制
	 * 
	 * @param flowrateConf
	 * @param flowrateREQ
	 * @param flowrateRule
	 * @param flowrateHandler
	 * @return
	 * @throws Throwable
	 */
	private RES cct(NeuralConf flowrateConf, REQ flowrateREQ, FlowrateRule flowrateRule, IFlowrateHandler<REQ, RES> flowrateHandler) throws Throwable {
		NIOSemaphoreFlowrate<REQ, RES> nioSemaphoreFlowrate = nioSemaphoreFlowrateMap.get(flowrateConf.getNeuralId());
		if (nioSemaphoreFlowrate == null) {
			nioSemaphoreFlowrate = new NIOSemaphoreFlowrate<REQ, RES>(flowrateRule);
		}
		return nioSemaphoreFlowrate.semaphore(flowrateConf, flowrateREQ, flowrateHandler);
	}

	/**
	 * QPS控制
	 * 
	 * @param flowrateConf
	 * @param flowrateREQ
	 * @param flowrateRule
	 * @param flowrateHandler
	 * @return
	 * @throws Throwable
	 */
	private RES qps(NeuralConf flowrateConf, REQ flowrateREQ, FlowrateRule flowrateRule, IFlowrateHandler<REQ, RES> flowrateHandler) throws Throwable {
		if (flowrateRule.getPermitsPerSecond() == 0.0) {// 服务不限流
			return flowrateHandler.handler(flowrateConf, flowrateREQ);
		}
		
		NIORateLimiterFlowrate<REQ, RES> nioRateLimiterFlowrate = nioRateLimiterFlowrateMap.get(flowrateConf.getNeuralId());
		if (nioRateLimiterFlowrate == null) {
			nioRateLimiterFlowrate = new NIORateLimiterFlowrate<REQ, RES>(flowrateRule);
		}
		boolean rateLimiterCheck = nioRateLimiterFlowrate.rateLimiter();// QPS流控校验器
		if (rateLimiterCheck) {// 放通
			return flowrateHandler.handler(flowrateConf, flowrateREQ);
		} else {// QPS拒绝
			throw new QPSRejectREQException("QPS流控拒绝");
		}
	}

	/**
	 * 销毁
	 */
	public void destory() throws Throwable {

	}

}
